from typing import Union, List, Dict, Tuple
import asyncio
import re 
import platform
import socket
import time
import os

import json
import datetime
import subprocess
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from socket import timeout
import urllib
import psutil
import tempfile
import random


from mcp.server import FastMCP



# Create an MCP server
mcp = FastMCP("Perf进程性能分析助手")



def run_command(command, shell=True, check=True, capture_output=False, cwd=None, 
                timeout=None, stdout=None, stderr=None):
    """执行系统命令并处理结果（增强版，支持超时和输出重定向）"""
    try:
        result = subprocess.run(
            command,
            shell=shell,
            check=check,
            capture_output=capture_output,
            text=True,
            cwd=cwd,
            timeout=timeout,
            stdout=stdout,
            stderr=stderr
        )
        return result
    except subprocess.CalledProcessError as e:
        print(f"命令执行失败: {command}")
        print(f"错误输出: {e.stderr}")
        raise e
    except subprocess.TimeoutExpired:
        print(f"命令执行超时: {command}")
        raise
    except Exception as e:
        print(f"命令执行异常: {command}, 错误: {str(e)}")
        raise e




def perf_collect_tool(process: Union[int, str]) -> str:
    """
    接收用户输入的进程名称或PID，使用perf工具采集指定进程的性能数据并输出为perf.data
    当用户询问指定进程的性能数据时调用，或perf安装工具成功执行后调用
    
    参数:
        process: 进程PID（整数/字符串）或进程名称（字符串，支持模糊匹配）
        
    返回:
        执行结果信息（成功/错误提示）
    """
    # 1. 解析目标进程PID
    pid_int = None
    
    # 尝试将输入解析为PID
    try:
        pid_int = int(process)
    except ValueError:
        # 解析失败，视为进程名称进行模糊匹配
        process_name = str(process)
        try:
            # 使用pgrep进行模糊匹配（不添加-x选项）
            result = run_command(
                f"pgrep -f '{process_name}'",  # 单引号包裹避免特殊字符问题
                shell=True,
                check=True,
                capture_output=True
            )
            
            # 提取所有匹配的PID（按行分割）
            pids = [p.strip() for p in result.stdout.strip().split('\n') if p.strip()]
            if not pids:
                return f"错误: 未找到包含 '{process_name}' 的进程"
            
            # 选取第一个PID，若有多个则提示
            pid_int = int(pids[0])
                
        except subprocess.CalledProcessError:
            return f"错误: 未找到包含 '{process_name}' 的进程"
        except Exception as e:
            return f"错误: 查找进程PID时发生异常 - {str(e)}"
    
    # 验证PID有效性（简单检查是否为正整数）
    if not isinstance(pid_int, int) or pid_int <= 0:
        return f"错误: 无效的PID值 {pid_int}"
    
    # 2. 处理已存在的perf.data文件
    perf_data_path = os.path.join(os.getcwd(), "perf.data")
    if os.path.exists(perf_data_path):
        try:
            os.remove(perf_data_path)
        except PermissionError:
            return "错误: 没有权限删除已存在的perf.data文件"
        except Exception as e:
            return f"错误: 删除perf.data时发生异常 - {str(e)}"
    
    # 3. 执行perf采集命令
    perf_cmd = [
        'perf', 'record', 
        '-p', str(pid_int),
        '-o', 'perf.data',
        '-g',
        '--', 'sleep', '5'  # 采集5秒数据
    ]
    
    try:
        result = run_command(
            perf_cmd,
            shell=False,
            check=False,
            capture_output=True
        )
        
        if result.returncode == 0:
            if os.path.exists(perf_data_path) and os.path.getsize(perf_data_path) > 0:
                return f"成功: 已采集PID {pid_int} 的性能数据，保存至 perf.data"
            else:
                return "警告: 命令执行成功，但未生成有效的perf.data文件"
        else:
            error_msg = result.stderr.strip() or "未知错误"
            return f"错误: perf命令执行失败（返回码 {result.returncode}）- {error_msg}"
    
    except FileNotFoundError:
        return "错误: 未找到perf工具，请先安装"
    except Exception as e:
        return f"错误: 采集过程发生异常 - {str(e)}"



def install_perf_tool(timeout_seconds: int = 300) -> Tuple[bool, Optional[str]]:
    """
    使用yum安装perf工具，添加超时控制避免请求超时
    仅在perf采集工具返回'错误: 未找到perf工具，请确保已安装perf'时调用
    
    参数:
        timeout_seconds: 安装命令超时时间（默认300秒，可根据网络调整）
    返回:
        Tuple[bool, Optional[str]]: 安装结果及信息
    """
    try:
        # 执行yum安装（使用yum包管理器）
        install_cmd = ['sudo', 'yum', 'install', '-y', 'perf']
        
        # 执行安装（超时控制）
        install_result = run_command(
            install_cmd,
            shell=False,
            check=False,
            capture_output=True,
            timeout=timeout_seconds
        )
        
        if install_result.returncode == 0:
            return (True, "perf安装成功")
        else:
            error_msg = install_result.stderr.strip() or "未知错误"
            return (False, f"安装失败（返回码：{install_result.returncode}）：{error_msg}")
    
    except subprocess.TimeoutExpired:
        # 捕获超时异常，明确提示yum安装命令
        return (False, f"安装超时（超过{timeout_seconds}秒）。建议手动执行命令安装：\nsudo yum install -y perf")
    except PermissionError:
        return (False, "权限不足，请用sudo运行脚本")
    except FileNotFoundError:
        return (False, "未找到yum或sudo命令，无法安装")
    except Exception as e:
        return (False, f"安装出错：{str(e)}")




def generate_flamegraph_tool() -> Tuple[bool, Optional[str]]:
    """
    处理当前工作目录下的perf.data文件，生成火焰图perf.svg（无打印输出）
    
    处理流程:
    1. 检查火焰图生成所需工具是否存在，不存在则自动安装到当前目录FlameGraph文件夹
    2. 检查当前目录下是否存在perf.data文件
    3. 将perf.data解析为perf.unfold
    4. 对perf.unfold进行符号折叠
    5. 转换为火焰图perf.svg并保存在当前目录
    6. 覆盖已存在的同名文件
    
    返回:
        Tuple[bool, Optional[str]]: 包含两个元素的元组：
            - 第一个元素：bool 类型，表示火焰图生成是否成功
            - 第二个元素：str 类型，生成成功时为SVG文件的绝对路径，
                         生成失败时为错误信息
    """
    # 定义需要的工具路径和仓库地址
    STACKCOLLAPSE_TOOL = "stackcollapse-perf.pl"
    FLAMEGRAPH_TOOL = "flamegraph.pl"
    FLAMEGRAPH_REPO = "https://mirrors.tuna.tsinghua.edu.cn/git/brendangregg/FlameGraph.git"
    # 在当前工作目录创建FlameGraph目录
    FLAMEGRAPH_DIR = os.path.join(os.getcwd(), "FlameGraph")
    
    # 第一步：检查并安装火焰图工具
    def check_tool(tool_name):
        """检查工具是否存在"""
        try:
            run_command(
                [tool_name, "--version"],
                shell=False,
                check=True,
                capture_output=True,
                timeout=10
            )
            return True
        except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired):
            return False
    
    def install_flamegraph_tools():
        """安装FlameGraph工具集到当前目录的FlameGraph文件夹"""
        try:
            # 检查git是否安装
            try:
                run_command(
                    ["git", "--version"],
                    shell=False,
                    check=True,
                    capture_output=True,
                    timeout=10
                )
            except (FileNotFoundError, subprocess.CalledProcessError):
                return False, "未找到git命令，请先安装git"
            
            # 清理已存在的目录（如果存在）
            if os.path.exists(FLAMEGRAPH_DIR):
                try:
                    import shutil
                    shutil.rmtree(FLAMEGRAPH_DIR)
                except Exception as e:
                    return False, f"清理已有FlameGraph目录失败: {str(e)}"
            
            # 创建目录
            try:
                os.makedirs(FLAMEGRAPH_DIR, exist_ok=True)
            except PermissionError:
                return False, f"没有权限创建目录: {FLAMEGRAPH_DIR}"
            except Exception as e:
                return False, f"创建FlameGraph目录失败: {str(e)}"
            
            # 克隆仓库到指定目录
            clone_cmd = ["git", "clone", FLAMEGRAPH_REPO, FLAMEGRAPH_DIR]
            try:
                run_command(
                    clone_cmd,
                    shell=False,
                    check=True,
                    capture_output=True,
                    timeout=300
                )
            except subprocess.CalledProcessError as e:
                return False, f"克隆FlameGraph仓库失败: {e.stderr.strip()}"
            
            # 检查工具文件是否存在
            stackcollapse_path = os.path.join(FLAMEGRAPH_DIR, STACKCOLLAPSE_TOOL)
            flamegraph_path = os.path.join(FLAMEGRAPH_DIR, FLAMEGRAPH_TOOL)
            
            if not (os.path.exists(stackcollapse_path) and os.path.exists(flamegraph_path)):
                return False, "FlameGraph工具文件缺失"
            
            # 将工具路径添加到环境变量
            os.environ["PATH"] = f"{FLAMEGRAPH_DIR}:{os.environ['PATH']}"
            return True, f"FlameGraph工具安装成功，路径: {FLAMEGRAPH_DIR}"
                
        except Exception as e:
            return False, f"安装FlameGraph工具失败: {str(e)}"
    
    # 检查工具是否存在，不存在则安装
    if not (check_tool(STACKCOLLAPSE_TOOL) and check_tool(FLAMEGRAPH_TOOL)):
        install_success, install_msg = install_flamegraph_tools()
        if not install_success:
            return (False, f"火焰图工具缺失，安装失败: {install_msg}")
    
    # 定义文件路径
    perf_data_path = os.path.join(os.getcwd(), "perf.data")
    unfold_path = os.path.join(os.getcwd(), "perf.unfold")
    svg_path = os.path.join(os.getcwd(), "perf.svg")
    
    # 检查perf.data是否存在
    if not os.path.exists(perf_data_path):
        return (False, f"未找到perf.data文件，路径：{perf_data_path}")
    
    # 检查文件是否为空
    if os.path.getsize(perf_data_path) == 0:
        return (False, "perf.data文件为空")
    
    folded_path = None  # 初始化临时折叠文件路径变量
    try:
        # 第一步：将perf.data解析为perf.unfold
        try:
            with open(unfold_path, 'w') as unfold_file:
                run_command(
                    ['perf', 'script', '-i', 'perf.data'],
                    shell=False,
                    check=True,
                    capture_output=False,
                    stdout=unfold_file,
                    stderr=subprocess.PIPE
                )
            
            if not os.path.exists(unfold_path) or os.path.getsize(unfold_path) == 0:
                return (False, "生成perf.unfold文件失败或文件为空")
                
        except subprocess.CalledProcessError as e:
            error_msg = f"解析perf.data失败: {e.stderr.strip()}"
            return (False, error_msg)
        except Exception as e:
            return (False, f"生成perf.unfold时发生错误: {str(e)}")
        
        # 第二步：符号折叠处理
        try:
            # 获取工具的绝对路径
            stackcollapse_abs_path = os.path.join(FLAMEGRAPH_DIR, STACKCOLLAPSE_TOOL)
            with tempfile.NamedTemporaryFile(delete=False, suffix='.folded') as folded_file:
                folded_path = folded_file.name
                
                run_command(
                    [stackcollapse_abs_path, unfold_path],
                    shell=False,
                    check=True,
                    capture_output=False,
                    stdout=folded_file,
                    stderr=subprocess.PIPE
                )
                
        except subprocess.CalledProcessError as e:
            error_msg = f"符号折叠失败: {e.stderr.strip()}"
            return (False, error_msg)
        except FileNotFoundError:
            return (False, f"未找到{STACKCOLLAPSE_TOOL}工具，请手动检查FlameGraph目录")
        except Exception as e:
            return (False, f"符号折叠时发生错误: {str(e)}")
        
        # 第三步：生成火焰图
        try:
            # 确保覆盖已有文件
            if os.path.exists(svg_path):
                os.remove(svg_path)
            
            with open(svg_path, 'w') as svg_file:
                run_command(
                    [FLAMEGRAPH_TOOL, folded_path],
                    shell=False,
                    check=True,
                    capture_output=False,
                    stdout=svg_file,
                    stderr=subprocess.PIPE
                )
            
            if not os.path.exists(svg_path) or os.path.getsize(svg_path) == 0:
                return (False, "生成perf.svg文件失败或文件为空")
                
            return (True, os.path.abspath(svg_path))
            
        except subprocess.CalledProcessError as e:
            error_msg = f"生成火焰图失败: {e.stderr.strip()}"
            return (False, error_msg)
        except FileNotFoundError:
            return (False, f"未找到{FLAMEGRAPH_TOOL}工具，请手动检查FlameGraph目录")
        except Exception as e:
            return (False, f"生成火焰图时发生错误: {str(e)}")
            
    finally:
        # 清理临时折叠文件
        if folded_path and os.path.exists(folded_path):
            try:
                os.remove(folded_path)
            except Exception:
                pass  # 不处理清理失败，避免产生输出




if __name__ == "__main__":
    # Initialize and run the server
    mcp.run(transport='sse')